﻿using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.SystemMessages;
using EasyNetQ.Topology;
using EasyNetQErrorHandler.Models;
using System;
using System.Threading.Tasks;

namespace EasyNetQErrorHandler
{
    class Program
    {
        private static IBus bus;
        private const string ErrorQueue = "EasyNetQ_Default_Error_Queue";

        static void Main(string[] args)
        {
            var connStr = "host=192.168.0.248;username=admin;password=admin;publisherConfirms=true";
            //方法1：替换默认消息异常处理方法，改为AlwaysRequeueErrorStrategy:当发生异常时，消息重新发到原队列
            bus = RabbitHutch.CreateBus(connStr, x => x.Register<IConsumerErrorStrategy>(_ => new AlwaysRequeueErrorStrategy()));

            //方法2：默认消息异常处理方法，它会把异常消息发送到队列EasyNetQ_Default_Error_Queue中
            //然后另外指定一个消费者来处理这些错误消息,比如HandleErrors
            //bus = RabbitHutch.CreateBus(connStr);
            /*订阅消息*/
            Subscribe();

            /*处理错误队列中的错误数据*/
            //从EasyNetQ默认异常队列中获取消息并处理
            HandleErrors();

            /*发布消息*/
            Console.WriteLine("输入文字，按回车发送消息！");
            while (true)
            {
                var msg = Console.ReadLine();
                bus.Publish(new Question(msg));
            }
        }

        #region 消费者
        private static void Subscribe()
        {
            /*声明两个消费者*/
            bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(1));
            bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(2));
        }

        private static Func<int,Task> HandleMessageAsync(Question question)
        {
            return  async (id) =>
            {
                if (new Random().Next(0, 2) == 0)
                {
                    Console.WriteLine("Exception Happened!!!!");
                    throw new Exception("Error Hanppened!");
                }
                else
                {
                    Console.WriteLine(string.Format("worker：{0}，content：{1}", id, question.Text));
                }
            };
        }
        #endregion

        #region 异常处理

        /// <summary>
        /// 从EasyNetQ默认异常队列中获取消息并处理
        /// </summary>
        private static void HandleErrors()
        {
            Action<IMessage<Error>, MessageReceivedInfo> handleErrorMessage = HandleErrorMessage;

            IQueue queue = new Queue(ErrorQueue, false);
            bus.Advanced.Consume(queue, handleErrorMessage);
        }

        /// <summary>
        /// 处理异常消息
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="info"></param>
        private static void HandleErrorMessage(IMessage<Error> msg, MessageReceivedInfo info)
        {
            //info.
            Console.WriteLine($"catch process。Exchange:{info.Exchange}, RoutingKey:{info.RoutingKey},Queue:{info.Queue},Body:{msg.Body.Message}");
        }
        #endregion
    }
}
